# Only required for google colab
from google.colab import drive
drive.mount('/content/drive')
!pip install livelossplot
!pip install -U seaborn # colab version is old
cd '/content/drive/My Drive'
# perform the necessary imports
import pandas as pd
import numpy as np
import os
from keras.preprocessing import image
from keras.applications.resnet50 import ResNet50, preprocess_input
from keras.models import Sequential, load_model
from keras.layers import Dense, GlobalAveragePooling2D, Flatten, Dropout, BatchNormalization
from keras.callbacks import ModelCheckpoint
from keras import optimizers
import keras.backend as K
import matplotlib.pyplot as plt
import seaborn as sns
import cv2
from tqdm import tqdm_notebook
sns.set()
%matplotlib inline
from livelossplot import PlotLossesKeras
# from face_detector import detect_face uncomment if face detection is required. dlib is required for face detection to work
# specify constants
image_location = './data/cropped_faces/'
processed_folder = './processed_data/cropped_faces/'
random_state = 3
train_size = 1000
test_size = 1000
valid_size = 500
feature_extractor = ResNet50(weights='imagenet', include_top=False, pooling=None)
target_size = (224, 224)
features_folder = processed_folder + 'resnet50/'
# warning is due to changes in keras 2.2.0, we will go without the pooling layer so that we can Flatten with more data.
files = [name for name in os.listdir(image_location)]
def process_file_name(name):
split_name = name.split('_')
# in case we run into problems, just put None for all values so we drop it later
return [name] + list(map(int, split_name[:3])) if len(split_name) == 4 else [name, None, None, None]
processed_files = [process_file_name(name) for name in files]
data = pd.DataFrame(columns=['Name', 'Age', 'Gender', 'Race'], data=processed_files).dropna()
targets = data['Age']
# let's see a some details about our data
data.describe()
data.head()
train_data, valid_data, test_data = np.split(data.sample(n=train_size+valid_size+test_size, random_state=random_state), [train_size, train_size+valid_size])
train_targets, valid_targets, test_targets = train_data['Age'], valid_data['Age'], test_data['Age']
# cofnirm we got the sizes we aimed for
assert len(train_data) == train_size
assert len(valid_data) == valid_size
assert len(test_data) == test_size
# let's save the data, so we can use it later
data.to_csv(processed_folder + 'full_data.csv')
train_data.to_csv(processed_folder + 'train_data.csv')
valid_data.to_csv(processed_folder + 'valid_data.csv')
test_data.to_csv(processed_folder + 'test_data.csv')
def show_datset_distribution(data, title, save_file_name=None):
# the chart looks better with proper names
def add_label_data_for_chart(data):
def convert_race(r):
if r == 0:
return 'White'
if r == 1:
return 'Black'
if r == 2:
return 'Asian'
if r == 3:
return 'Indian'
if r == 4:
return 'Other'
data['Gender_Label'] = data['Gender'].apply(lambda g: 'Male' if g == 0 else 'Female')
data['Race_Label'] = data['Race'].apply(lambda r: convert_race(r))
return data
# we don't want to modify the actual dataset
data = data.copy()
# add descriptive labels
add_label_data_for_chart(data)
# let's draw
figure,ax=plt.subplots(1,3, figsize=(20,5))
figure.suptitle(title, fontsize=16)
data['Age'].plot(kind='hist', bins=30,title='Age distribution', ax=ax[0])
ax[0].set_xlabel('Age')
sns.catplot(x='Gender_Label', kind="count", data=data, ax=ax[1], order=['Male', 'Female']);
ax[1].set_title('Gender Distribution')
ax[1].set_xlabel('Gender')
sns.catplot(x='Race_Label', kind="count", data=data, ax=ax[2], order=['White', 'Black', 'Asian', 'Indian', 'Other']);
ax[2].set_title('Race Distribution')
ax[2].set_xlabel('Race')
plt.close(2)
plt.close(3)
if save_file_name:
plt.savefig(save_file_name)
None
show_datset_distribution(data, 'Complete Data Distribution', save_file_name='figures/full_data_distribution.png')
show_datset_distribution(train_data, 'Training Data Distribution')
show_datset_distribution(valid_data, 'Validation Data Distrubtion')
show_datset_distribution(test_data, 'Test Data Distribution')
def show_sample_ukt_images(data=data, show_pred=False):
sample_faces = data.sample(n=16)
fig, axes = plt.subplots(4,4, figsize=(20,20))
for i in range(16):
sample_face = sample_faces.iloc[i]
file_name = image_location + sample_face['Name']
ax = axes[i//4, i%4] # figure out the location
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
if not show_pred:
ax.set_title('Age:' + str(sample_face['Age']))
else:
ax.set_title('Age:' + str(sample_face['Age']) + ' Predicted: ' + str(sample_face['predicted_age']))
ax.imshow(image.load_img(file_name))
show_sample_ukt_images()
# methods obtained from dog breed classifier project (with modification)
def path_to_tensor(file, target_size):
img = image.load_img(file, target_size=target_size)
x = image.img_to_array(img)
return np.expand_dims(x, axis=0)
def paths_to_tensor(files, target_size):
list_of_tensors = [path_to_tensor(file, target_size) for file in files]
return np.vstack(list_of_tensors)
def extract_features(files, target_size):
return feature_extractor.predict(preprocess_input(paths_to_tensor(files, target_size)))
def extract_features_from_tensors(tensors, target_size):
return feature_extractor.predict(preprocess_input(tensors))
def extract_features_for_dataset(data, file_prefix, target_size, save=True):
# extract data features for given dataset
data_features = extract_features(list(map(lambda name: image_location + name, data['Name'])), target_size)
if save:
np.save(features_folder + file_prefix + '_data_features', data_features)
return data_features
train_data_features = extract_features_for_dataset(train_data, 'train', target_size)
train_data_features.shape
valid_data_features = extract_features_for_dataset(valid_data, 'valid', target_size)
valid_data_features.shape
test_data_features = extract_features_for_dataset(test_data, 'test', target_size)
test_data_features.shape
def load_input_features(features_folder):
train_data_features = np.load(features_folder + 'train_data_features.npy')
valid_data_features = np.load(features_folder + 'valid_data_features.npy')
test_data_features = np.load(features_folder + 'test_data_features.npy')
return train_data_features, valid_data_features, test_data_features
def load_data(processed_folder):
train_data = pd.read_csv(processed_folder + 'train_data.csv')
train_targets = train_data['Age']
valid_data = pd.read_csv(processed_folder + 'valid_data.csv')
valid_targets = valid_data['Age']
test_data = pd.read_csv(processed_folder + 'test_data.csv')
test_targets = test_data['Age']
return train_data, train_targets, valid_data, valid_targets, test_data, test_targets
train_data_features, valid_data_features, test_data_features = load_input_features(features_folder)
train_data, train_targets, valid_data, valid_targets, test_data, test_targets = load_data(processed_folder)
def load_tensors(processed_folder):
train_data_tensors = np.load(processed_folder + 'train_data_tensors.npy')
valid_data_tensors = np.load(processed_folder + 'valid_data_tensors.npy')
test_data_tensors = np.load(processed_folder + 'test_data_tensors.npy')
return train_data_tensors, valid_data_tensors, test_data_tensors
def load_augmented_input_features(features_folder):
train_data_features_augmented = np.load(features_folder + 'train_data_features_augmented.npy')
valid_data_features_augmented = np.load(features_folder + 'valid_data_features_augmented.npy')
test_data_features = np.load(features_folder + 'test_data_features.npy')
return train_data_features_augmented, valid_data_features_augmented, test_data_features
def load_augmented_data(processed_folder):
train_data_augmented = pd.read_csv(processed_folder + 'train_data_augmented.csv')
train_targets_augmented = train_data_augmented['Age']
valid_data_augmented = pd.read_csv(processed_folder + 'valid_data_augmented.csv')
valid_targets_augmented = valid_data_augmented['Age']
test_data = pd.read_csv(processed_folder + 'test_data.csv')
test_targets = test_data['Age']
return train_data_augmented, train_targets_augmented, valid_data_augmented, valid_targets_augmented, test_data, test_targets
train_data_features_augmented, valid_data_features_augmented, test_data_features = load_augmented_input_features(features_folder)
train_data_augmented, train_targets_augmented, valid_data_augmented, valid_targets_augmented, test_data, test_targets = load_augmented_data(processed_folder)
# only use if you are only dealing with augmented_data, will override train_data_features and ohters. This is required becasue some methods use it as default
train_data_features = train_data_features_augmented
train_targets = train_targets_augmented
valid_data_features = valid_data_features_augmented
valid_targets = valid_targets_augmented
def soft_accuracy_10(y_true, y_pred):
return K.mean(abs(y_pred - y_true) < 10)
def soft_accuracy_5(y_true, y_pred):
return K.mean(abs(y_pred - y_true) < 5)
def tilted_loss(y, f, q=.7):
# code based on https://sachinruk.github.io/blog/Quantile-Regression/
e = (y-f)
return K.mean(K.maximum(q*e, (q-1)*e), axis=-1)
# some methods to avoid repetition
def run_model(model, x=train_data_features, y=train_targets, x_val=valid_data_features, y_val=valid_targets):
batch_size = 32
epochs = 100
model.fit(x, y, validation_data=(x_val, y_val),
epochs=epochs, batch_size=batch_size, callbacks=[checkpointer, PlotLossesKeras()], verbose=1)
def evaluate_model(model, x=test_data_features, y=test_targets):
metrics = model.evaluate(x=x, y=y)
for metric in zip(model.metrics_names, metrics):
print('{}: {}'.format(metric[0], metric[1]))
def save_base_tensor(data, file_location):
# we save the tensor representing the iamge, we shall use this in our naive benchmark
data_tensors = paths_to_tensor(list(map(lambda name: image_location + '/'+ name, data['Name'])), target_size)
np.save(file_location, data_tensors)
return data_tensors
naive_benchmark_folder = processed_folder + '/naive_benchmark/'
train_data_tensors = save_base_tensor(train_data, processed_folder + 'train_data_tensors')
valid_data_tensors = save_base_tensor(valid_data, processed_folder + 'valid_data_tensors')
test_data_tensors = save_base_tensor(test_data, processed_folder + 'test_data_tensors')
None
# load already saved tensors (if necessary)
train_data_tensors, valid_data_tensors, test_data_tensors = load_tensors(processed_folder)
def naive_model(input_shape):
model = Sequential()
model.add(Flatten(input_shape=input_shape))
model.add(Dense(10, activation='relu'))
model.add(Dense(1))
model.compile(loss='mae', optimizer='adam', metrics=[soft_accuracy_10, soft_accuracy_5])
return model
model = naive_model(train_data_tensors.shape[1:])
model.summary()
checkpointer = ModelCheckpoint(filepath=naive_benchmark_folder + 'weights.best.hdf5',
verbose=1, save_best_only=True)
run_model(model, x=train_data_tensors, x_val=valid_data_tensors)
evaluate_model(model, x=test_data_tensors)
def cnn_v1_model(input_shape):
model = Sequential()
model.add(Flatten(input_shape=input_shape[1:]))
model.add(Dense(20, activation='relu'))
model.add(Dense(1))
model.compile(loss='mae', optimizer='adam', metrics=[soft_accuracy_10, soft_accuracy_5])
return model
model = cnn_v1_model(train_data_features.shape)
model.summary()
checkpointer = ModelCheckpoint(filepath=features_folder + 'v1_augmentation_weights.best.hdf5',
verbose=1, save_best_only=True)
run_model(model)
evaluate_model(model)
run_model(model)
evaluate_model(model)
run_model(model, x=train_data_features_augmented, y=train_targets_augmented)
evaluate_model(model)
Let us add two batch normalization layers, and two dropout layers, see how they change the results.
def cnn_v2_model(input_shape):
model = Sequential()
model.add(Flatten(input_shape=input_shape[1:]))
model.add(BatchNormalization())
model.add(Dropout(.3))
model.add(Dense(20, activation='relu'))
model.add(BatchNormalization())
model.add(Dropout(.5))
model.add(Dense(1))
model.compile(loss='mae', optimizer='adam', metrics=[soft_accuracy_10, soft_accuracy_5])
return model
model = cnn_v2_model(train_data_features.shape)
model.summary()
checkpointer = ModelCheckpoint(filepath=features_folder + 'v2_augmentation_weights.best.hdf5',
verbose=1, save_best_only=True)
run_model(model)
evaluate_model(model)
run_model(model)
evaluate_model(model)
run_model(model, x=train_data_features_augmented, y=train_targets_augmented)
evaluate_model(model)
Let us increase the drop out percentage of our first dropout layer and observe the effect.
def cnn_v3_model(input_shape):
model = Sequential()
model.add(Flatten(input_shape=input_shape[1:]))
model.add(BatchNormalization())
model.add(Dropout(.5))
model.add(Dense(20, activation='relu'))
model.add(BatchNormalization())
model.add(Dropout(.5))
model.add(Dense(1))
model.compile(loss='mae', optimizer='adam', metrics=[soft_accuracy_10, soft_accuracy_5])
return model
model = cnn_v3_model(train_data_features.shape)
model.summary()
checkpointer = ModelCheckpoint(filepath=features_folder + 'v3_augmentation_weights.best.hdf5',
verbose=1, save_best_only=True)
run_model(model)
evaluate_model(model)
run_model(model)
evaluate_model(model)
run_model(model, x=train_data_features_augmented, y=train_targets_augmented)
evaluate_model(model)
def cnn_v4_model(input_shape):
model = Sequential()
model.add(Flatten(input_shape=input_shape[1:]))
model.add(BatchNormalization())
model.add(Dropout(.5))
model.add(Dense(20, activation='relu'))
model.add(BatchNormalization())
model.add(Dropout(.5))
model.add(Dense(1))
adam = optimizers.Adam(lr=0.0001, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.0)
model.compile(loss='mae', optimizer=adam, metrics=[soft_accuracy_10, soft_accuracy_5])
return model
model = cnn_v4_model(train_data_features.shape)
model.summary()
checkpointer = ModelCheckpoint(filepath=features_folder + 'v4_augmentation_weights.best.hdf5',
verbose=1, save_best_only=True)
run_model(model)
evaluate_model(model)
run_model(model, x=train_data_features_augmented, y=train_targets_augmented)
evaluate_model(model)
def cnn_v5_model(input_shape):
model = Sequential()
model.add(Flatten(input_shape=input_shape[1:]))
model.add(BatchNormalization())
model.add(Dropout(.5))
model.add(Dense(20, activation='relu'))
model.add(BatchNormalization())
model.add(Dropout(.5))
model.add(Dense(1))
adam = optimizers.Adam(lr=0.001, decay=1e-5)
model.compile(loss='mae', optimizer=adam, metrics=[soft_accuracy_10, soft_accuracy_5])
return model
model = cnn_v5_model(train_data_features.shape)
model.summary()
checkpointer = ModelCheckpoint(filepath=features_folder + 'v5_augmentation_weights.best.hdf5',
verbose=1, save_best_only=True)
run_model(model)
evaluate_model(model)
run_model(model, x=train_data_features_augmented, y=train_targets_augmented)
evaluate_model(model)
def cnn_v6_model(input_shape):
model = Sequential()
model.add(Flatten(input_shape=input_shape[1:]))
model.add(BatchNormalization())
model.add(Dropout(.5))
model.add(Dense(20, activation='relu'))
model.add(BatchNormalization())
model.add(Dropout(.5))
model.add(Dense(1))
adam = optimizers.Adam(lr=0.001, decay=1e-5)
model.compile(loss=tilted_loss, optimizer=adam, metrics=['mae', soft_accuracy_10, soft_accuracy_5])
return model
model = cnn_v6_model(train_data_features.shape)
model.summary()
checkpointer = ModelCheckpoint(filepath=features_folder + 'v6_augmentation_weights.best.hdf5',
verbose=1, save_best_only=True)
run_model(model)
evaluate_model(model)
run_model(model, x=train_data_features_augmented, y=train_targets_augmented)
evaluate_model(model)
def cnn_v7_model(input_shape):
model = Sequential()
model.add(Flatten(input_shape=input_shape[1:]))
model.add(BatchNormalization())
model.add(Dense(20, activation='relu'))
model.add(BatchNormalization())
model.add(Dense(1))
model.compile(loss='mae', optimizer='adam', metrics=[soft_accuracy_10, soft_accuracy_5])
return model
model = cnn_v7_model(train_data_features.shape)
model.summary()
checkpointer = ModelCheckpoint(filepath=features_folder + 'v7_augmentation_weights.best.hdf5',
verbose=1, save_best_only=True)
run_model(model)
evaluate_model(model)
run_model(model, x=train_data_features_augmented, y=train_targets_augmented)
evaluate_model(model)
def cnn_v8_model(input_shape):
model = Sequential()
model.add(Flatten(input_shape=input_shape[1:]))
model.add(Dense(20, activation='relu'))
model.add(Dropout(.5))
model.add(Dense(1))
model.compile(loss='mae', optimizer='adam', metrics=[soft_accuracy_10, soft_accuracy_5])
return model
model = cnn_v8_model(train_data_features.shape)
model.summary()
checkpointer = ModelCheckpoint(filepath=features_folder + 'v8_augmentation_weights.best.hdf5',
verbose=1, save_best_only=True)
run_model(model)
evaluate_model(model)
run_model(model, x=train_data_features_augmented, y=train_targets_augmented)
evaluate_model(model)
def cnn_v9_model(input_shape):
model = Sequential()
model.add(Flatten(input_shape=input_shape[1:]))
model.add(Dropout(.3))
model.add(Dense(20, activation='relu'))
model.add(Dropout(.5))
model.add(Dense(1))
model.compile(loss='mae', optimizer='adam', metrics=[soft_accuracy_10, soft_accuracy_5])
return model
model = cnn_v9_model(train_data_features.shape)
model.summary()
checkpointer = ModelCheckpoint(filepath=features_folder + 'v9_augmentation_weights.best.hdf5',
verbose=1, save_best_only=True)
run_model(model)
evaluate_model(model)
run_model(model, x=train_data_features_augmented, y=train_targets_augmented)
evaluate_model(model)
def cnn_v10_model(input_shape):
model = Sequential()
model.add(Flatten(input_shape=input_shape[1:]))
model.add(Dense(50, activation='relu'))
model.add(Dense(50, activation='relu'))
model.add(Dense(1))
model.compile(loss='mae', optimizer='adam', metrics=[soft_accuracy_10, soft_accuracy_5])
return model
model = cnn_v10_model(train_data_features.shape)
model.summary()
checkpointer = ModelCheckpoint(filepath=features_folder + 'v10_augmentation_weights.best.hdf5',
verbose=1, save_best_only=True)
run_model(model)
evaluate_model(model)
run_model(model)
evaluate_model(model)
run_model(model, x=train_data_features_augmented, y=train_targets_augmented)
evaluate_model(model)
def cnn_v11_model(input_shape):
model = Sequential()
model.add(Flatten(input_shape=input_shape[1:]))
model.add(Dense(50, activation='relu'))
model.add(Dense(50, activation='relu'))
model.add(Dense(50, activation='relu'))
model.add(Dense(1))
model.compile(loss='mae', optimizer='adam', metrics=[soft_accuracy_10, soft_accuracy_5])
return model
model = cnn_v11_model(train_data_features.shape)
model.summary()
checkpointer = ModelCheckpoint(filepath=features_folder + 'v11_augmentation_weights.best.hdf5',
verbose=1, save_best_only=True)
run_model(model)
evaluate_model(model)
run_model(model)
evaluate_model(model)
run_model(model, x=train_data_features_augmented, y=train_targets_augmented)
evaluate_model(model)
def cnn_v12_model(input_shape):
model = Sequential()
model.add(Flatten(input_shape=input_shape[1:]))
model.add(Dropout(.5))
model.add(Dense(50, activation='relu'))
model.add(Dense(100, activation='relu'))
model.add(Dense(1))
model.compile(loss='mae', optimizer='adam', metrics=[soft_accuracy_10, soft_accuracy_5])
return model
model = cnn_v12_model(train_data_features.shape)
model.summary()
checkpointer = ModelCheckpoint(filepath=features_folder + 'v12_augmentation_weights.best.hdf5',
verbose=1, save_best_only=True)
run_model(model)
evaluate_model(model)
run_model(model)
evaluate_model(model)
run_model(model, x=train_data_features_augmented, y=train_targets_augmented)
evaluate_model(model)
def cnn_v13_model(input_shape):
model = Sequential()
model.add(Flatten(input_shape=input_shape[1:]))
model.add(Dropout(.5))
model.add(Dense(50, activation='relu'))
model.add(Dense(100, activation='relu'))
model.add(Dense(1))
model.compile(loss='mae', optimizer='nadam', metrics=[soft_accuracy_10, soft_accuracy_5])
return model
model = cnn_v13_model(train_data_features.shape)
model.summary()
checkpointer = ModelCheckpoint(filepath=features_folder + 'v13_augmentation_weights.best.hdf5',
verbose=1, save_best_only=True)
run_model(model)
evaluate_model(model)
run_model(model)
evaluate_model(model)
run_model(model, x=train_data_features_augmented, y=train_targets_augmented)
evaluate_model(model)
def cnn_v14_model(input_shape):
model = Sequential()
model.add(Flatten(input_shape=input_shape[1:]))
model.add(Dropout(.5))
model.add(Dense(50, activation='relu'))
model.add(Dense(100, activation='relu'))
model.add(Dense(1))
model.compile(loss='mae', optimizer='adadelta', metrics=[soft_accuracy_10, soft_accuracy_5])
return model
model = cnn_v14_model(train_data_features.shape)
model.summary()
checkpointer = ModelCheckpoint(filepath=features_folder + 'v14_augmentation_weights.best.hdf5',
verbose=1, save_best_only=True)
run_model(model)
evaluate_model(model)
run_model(model)
evaluate_model(model)
run_model(model, x=train_data_features_augmented, y=train_targets_augmented)
evaluate_model(model)
def cnn_v15_model(input_shape):
model = Sequential()
model.add(Flatten(input_shape=input_shape[1:]))
model.add(Dense(50, activation='relu'))
model.add(Dense(50, activation='relu'))
model.add(Dense(50, activation='relu'))
model.add(Dense(50, activation='relu'))
model.add(Dense(1))
model.compile(loss='mae', optimizer='adam', metrics=[soft_accuracy_10, soft_accuracy_5])
return model
model = cnn_v15_model(train_data_features.shape)
model.summary()
checkpointer = ModelCheckpoint(filepath=features_folder + 'v15_augmentation_weights.best.hdf5',
verbose=1, save_best_only=True)
run_model(model)
evaluate_model(model)
run_model(model, x=train_data_features_augmented, y=train_targets_augmented)
evaluate_model(model)
run_model(model, x=train_data_features_augmented, y=train_targets_augmented)
evaluate_model(model)
def cnn_v16_model(input_shape):
model = Sequential()
model.add(Flatten(input_shape=input_shape[1:]))
model.add(Dense(50, activation='relu'))
model.add(Dense(50, activation='relu'))
model.add(Dense(50, activation='relu'))
model.add(Dense(50, activation='relu'))
model.add(Dense(50, activation='relu'))
model.add(Dense(1))
model.compile(loss='mae', optimizer='adam', metrics=[soft_accuracy_10, soft_accuracy_5])
return model
model = cnn_v16_model(train_data_features.shape)
model.summary()
checkpointer = ModelCheckpoint(filepath=features_folder + 'v16_augmentation_weights.best.hdf5',
verbose=1, save_best_only=True)
run_model(model)
evaluate_model(model)
run_model(model, x=train_data_features_augmented, y=train_targets_augmented)
evaluate_model(model)
def cnn_v17_model(input_shape):
model = Sequential()
model.add(Flatten(input_shape=input_shape[1:]))
model.add(Dense(50, activation='relu'))
model.add(Dense(50, activation='relu'))
model.add(Dense(50, activation='relu'))
model.add(Dense(50, activation='relu'))
model.add(Dense(50, activation='relu'))
model.add(Dense(50, activation='relu'))
model.add(Dense(1))
model.compile(loss='mae', optimizer='adam', metrics=[soft_accuracy_10, soft_accuracy_5])
return model
model = cnn_v17_model(train_data_features.shape)
model.summary()
checkpointer = ModelCheckpoint(filepath=features_folder + 'v17_augmentation_weights.best.hdf5',
verbose=1, save_best_only=True)
run_model(model)
evaluate_model(model)
run_model(model, x=train_data_features_augmented, y=train_targets_augmented)
evaluate_model(model)
def cnn_v18_model(input_shape):
model = Sequential()
model.add(Flatten(input_shape=input_shape[1:]))
model.add(Dropout(.5))
model.add(Dense(50, activation='relu'))
model.add(Dense(50, activation='relu'))
model.add(Dense(50, activation='relu'))
model.add(BatchNormalization())
model.add(Dense(50, activation='relu'))
model.add(Dense(50, activation='relu'))
model.add(Dense(50, activation='relu'))
model.add(Dense(1))
model.compile(loss='mae', optimizer='adam', metrics=[soft_accuracy_10, soft_accuracy_5])
return model
model = cnn_v18_model(train_data_features.shape)
model.summary()
checkpointer = ModelCheckpoint(filepath=features_folder + 'v18_weights.best.hdf5',
verbose=1, save_best_only=True)
run_model(model)
evaluate_model(model)
run_model(model, x=train_data_features_augmented, y=train_targets_augmented)
evaluate_model(model)
def cnn_v19_model(input_shape):
model = Sequential()
model.add(Flatten(input_shape=input_shape[1:]))
model.add(Dense(100, activation='relu'))
model.add(Dense(100, activation='relu'))
model.add(Dense(100, activation='relu'))
model.add(Dense(100, activation='relu'))
model.add(Dense(1))
model.compile(loss='mae', optimizer='adam', metrics=[soft_accuracy_10, soft_accuracy_5])
return model
model = cnn_v19_model(train_data_features.shape)
model.summary()
checkpointer = ModelCheckpoint(filepath=features_folder + 'v19_weights.best.hdf5',
verbose=1, save_best_only=True)
run_model(model)
evaluate_model(model)
run_model(model, x=train_data_features_augmented, y=train_targets_augmented)
evaluate_model(model)
run_model(model, x=train_data_features_augmented, y=train_targets_augmented)
evaluate_model(model)
def cnn_v20_model(input_shape):
model = Sequential()
model.add(Flatten(input_shape=input_shape[1:]))
model.add(Dense(100, activation='relu', kernel_initializer='normal'))
model.add(Dense(100, activation='relu', kernel_initializer='normal'))
model.add(Dense(100, activation='relu', kernel_initializer='normal'))
model.add(Dense(100, activation='relu', kernel_initializer='normal'))
model.add(Dense(1, kernel_initializer='normal'))
model.compile(loss='mae', optimizer='adam', metrics=[soft_accuracy_10, soft_accuracy_5])
return model
model = cnn_v20_model(train_data_features.shape)
model.summary()
checkpointer = ModelCheckpoint(filepath=features_folder + 'v20_weights.best.hdf5',
verbose=1, save_best_only=True)
run_model(model)
evaluate_model(model)
run_model(model, x=train_data_features_augmented, y=train_targets_augmented)
evaluate_model(model)
In here we evaluate the need for image augmentation by looking at some sample modifications.
def process_augmentation_demo(input_path, im_generator, save_file_name=None):
img = image.img_to_array(image.load_img(input_path))
img = img.reshape((1,) + img.shape)
img_flow = im_generator.flow(img, batch_size=1)
fig, axes = plt.subplots(4,4, figsize=(20,20))
for i, img in enumerate(img_flow):
new_image = image.array_to_img(img[0], scale=True)
ax = axes[i//4, i%4] # figure out the location
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
ax.imshow(new_image)
# let's just get 16 images
if i == 15:
break
if save_file_name:
plt.savefig(save_file_name)
# This is the image we will use as a sample
sample_image = image_location + train_data['Name'][0]
fig = plt.imshow(image.load_img(sample_image))
fig.axes.get_xaxis().set_visible(False)
fig.axes.get_yaxis().set_visible(False)
# rotation: This is not necessary since we use aligned photos, and plan to crop and align faces we recieve as input
process_augmentation_demo(sample_image, image.ImageDataGenerator(rotation_range=30))
# shift range: This is not useful since we will center around the face using landmark detection.
process_augmentation_demo(sample_image, image.ImageDataGenerator(width_shift_range=50, height_shift_range=50))
# brightness_range: This is a useful one we can use
process_augmentation_demo(sample_image, image.ImageDataGenerator(brightness_range=[0, 1.5]))
# shear_range: We could possibly benefit from this, even though the shear is unnatural
process_augmentation_demo(sample_image, image.ImageDataGenerator(shear_range=30))
# zoom_range: A small value of this could be useful
process_augmentation_demo(sample_image, image.ImageDataGenerator(zoom_range=.2))
# channel_shift_range: Could make our data more robust
process_augmentation_demo(sample_image, image.ImageDataGenerator(channel_shift_range=70))
# flip: The vertical flip is not good, however, having a horizontal flip would be good
process_augmentation_demo(sample_image, image.ImageDataGenerator(vertical_flip=False, horizontal_flip=True))
# let's put the ones we sant to use together
def get_augmentation_data():
return image.ImageDataGenerator(
brightness_range=[0, 1.5],
shear_range=30,
zoom_range=.2,
channel_shift_range=70,
vertical_flip=False, horizontal_flip=True)
process_augmentation_demo(sample_image, get_augmentation_data(), save_file_name='figures/augmentation_sample.png')
# we will generate 8 extra images from each image, this will multiply our training data 9-fold
augmentation_size = 8
def extract_augmentation_features(input_path, im_generator=get_augmentation_data(), augmentation_size = augmentation_size):
img = image.img_to_array(image.load_img(input_path, target_size=target_size))
img = img.reshape((1,) + img.shape)
img_flow = im_generator.flow(img, batch_size=1)
tensors = []
for i, tensor in enumerate(img_flow):
tensors.append(np.expand_dims(tensor[0], axis=0))
if i == augmentation_size - 1:
break
return extract_features_from_tensors(np.vstack(tensors), target_size)
def augment_data(data, data_features, augmentation_size=augmentation_size):
data_features_augmented = data_features.copy()
data_augmented = data.copy()
for i, sample in tqdm_notebook(data.iterrows()):
sample_augmentation = extract_augmentation_features(image_location + sample['Name'])
data_features_augmented = np.append(data_features_augmented, sample_augmentation, axis=0)
for j in range(augmentation_size):
sample_copy = sample.copy()
sample_copy['Name'] = 'aug_{}_{}'.format(j, sample['Name'])
data_augmented = data_augmented.append(sample_copy)
return data_augmented, data_features_augmented
train_data_augmented, train_data_features_augmented = augment_data(train_data, train_data_features)
train_data_augmented.reset_index(drop=True)
train_targets_augmented = train_data_augmented['Age']
np.save(features_folder + 'train_data_features_augmented', train_data_features_augmented)
train_data_augmented.to_csv(processed_folder + 'train_data_augmented.csv')
train_targets_augmented = train_data_augmented['Age']
np.save(features_folder + 'train_data_features_augmented', train_data_features_augmented)
train_data_augmented.to_csv(processed_folder + 'train_data_augmented.csv')
valid_data_augmented, valid_data_features_augmented = augment_data(valid_data, valid_data_features)
valid_data_augmented.reset_index(drop=True)
valid_targets_augmented = valid_data_augmented['Age']
np.save(features_folder + 'valid_data_features_augmented', valid_data_features_augmented)
valid_data_augmented.to_csv(processed_folder + 'valid_data_augmented.csv')
# specify constants
adience_metadata_location = './data/adience_fold_0_data.csv' # that is enough for our purposes
adience_image_location = './data/adience/'
adience_processed_folder = './processed_data/adience/'
adience_features_folder = adience_processed_folder + 'resnet50/'
age_groups = ['0-2', '4-6', '8-13', '15-20', '25-32', '38-43', '48-53', '60+']
# load adience data
def load_adience_data(adience_processed_folder = adience_processed_folder):
return pd.read_csv(adience_processed_folder + 'adience_data_modified.csv')
def load_adience_features(adience_features_folder = adience_features_folder):
return np.load(adience_features_folder + 'adience_data_features.npy')
adience_data = load_adience_data()
adience_features = load_adience_features()
adience_data = pd.read_csv(adience_metadata_location, sep='\t')
adience_data = adience_data[adience_data['age'] != 'None']
adience_data.describe()
adience_data.head()
# support methods
# get the file name from a row in the metadata
def get_file_name(row, adience_image_location=adience_image_location):
return adience_image_location + '{}/coarse_tilt_aligned_face.{}.{}'.format(row['user_id'],
row['face_id'],
row['original_image'])
# show. single image
def show_image(img):
fig = plt.imshow(img)
fig.axes.get_xaxis().set_visible(False)
fig.axes.get_yaxis().set_visible(False)
# show some sample images
def show_sample_adience_images(adience_data=adience_data, show_pred=False, save_file_name=None):
sample_faces = adience_data.sample(n=16)
fig, axes = plt.subplots(4,4, figsize=(20,20))
for i in range(16):
sample_face = sample_faces.iloc[i]
file_name = get_file_name(sample_face)
ax = axes[i//4, i%4] # figure out the location
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
if not show_pred:
ax.set_title('Age:' + age_groups[sample_face['age_cat']-1])
else:
ax.set_title('Age:' + age_groups[sample_face['age_cat']-1] + ' Predicted: ' + age_groups[sample_face['predicted_cat']-1])
ax.imshow(image.load_img(file_name))
if save_file_name:
plt.savefig(save_file_name)
# let us see some sample images
show_sample_adience_images()
tensors = []
for i, sample in tqdm_notebook(adience_data.iterrows()):
tensors.append(np.expand_dims(detect_face(get_file_name(sample)), axis=0))
cleaned_tensors = []
adience_data = adience_data.copy()
face_detected = pd.Series([])
for i, tensor in enumerate(tensors):
if tensor.shape == (1,):
cleaned_tensors.append(path_to_tensor(get_file_name(adience_data.iloc[i]), target_size))
face_detected.at[i] = False
else:
face_detected.at[i] = True
cleaned_tensors.append(tensor)
adience_data['face_detected'] = face_detected
adience_data[adience_data['face_detected'] == False].head()
# let us see some sample images that failed face detection; som of the faces are so clear
# it is very interesting they have not been detected by dlib
# however, some are not frontal images, or faces partially blocked
show_sample_adience_images(adience_data[adience_data['face_detected'] == False], save_file_name='figures/Adience_Faces_Not_Detected.png')
adience_features = extract_features_from_tensors(np.vstack(cleaned_tensors), target_size)
np.save(adience_features_folder + 'adience_data_features', adience_features)
adience_data.to_csv(adience_processed_folder + 'adience_data_modified.csv')
# convert to categories (0-2, 4-6, 8-13, 15-20, 25-32, 38-43, 48-53, 60-)
def to_category(val):
if type(val) == str:
if val == '(0, 2)':
return 1
if val == '(4, 6)':
return 2
if val == '(8, 12)':
return 3
if val == '(15, 20)':
return 4
if val == '(25, 32)':
return 5
if val == '(38, 43)' or val == '(38, 48)':
return 6
if val == '(48, 53)':
return 7
if val == '(60, 100)':
return 8
val = int(val)
if val <= 3:
return 1
if val <= 7:
return 2
if val <= 13.5:
return 3
if val <= 22.5:
return 4
if val <= 35:
return 5
if val <= 45.5:
return 6
if val <= 56.5:
return 7
return 8
adience_data['age_cat'] = adience_data['age'].apply(lambda x: to_category(x))
# let us see how the adience data is distributed
def show_distribution():
fig, ax = plt.subplots(figsize=(15,5))
g = sns.catplot(x='age_cat', kind="count", data=adience_data, ax=ax);
ax.set_xticklabels(age_groups)
ax.set_title('Age Distribution')
ax.set_xlabel('Age')
plt.close(2)
show_distribution()
def evaluate_against_adience(model, adience_data=adience_data):
age_predictions = model.predict(adience_features)
predictions = list(map(lambda p: to_category(p[0]), age_predictions))
ct = len(predictions)
c0 = 0
c1 = 0
i = 0
for y_pred, y_true in zip(predictions, adience_data['age_cat']):
diff = abs(y_pred - y_true)
if diff == 0 :
c0 += 1
elif diff == 1:
c1 += 1
i += 1
exact = c0 / ct
one_off = (c0 + c1) / ct
return exact, one_off
base_model_files = ['v1.2_weights.best', 'v2_weights.best', 'v3_weights.best', 'v4_weights.best',
'v5_weights.best', 'v6_weights.best', 'v7_weights.best', 'v8_weights.best',
'v9_weights.best', 'v10_weights.best', 'v11_weights.best', 'v12.2_weights.best',
'v13_weights.best', 'v14_weights.best', 'v15_weights.best', 'v16_weights.best',
'v17_weights.best', 'v18_weights.best', 'v19_weights.best', 'v20_weights.best']
augmented_model_files = ['v1_augmentation_weights.best', 'v2_augmentation_weights.best', 'v3_augmentation_weights.best',
'v4_augmentation_weights.best', 'v5_augmentation_weights.best', 'v6_augmentation_weights.best',
'v7_augmentation_weights.best', 'v8_augmentation_weights.best', 'v9_augmentation_weights.best',
'v10_augmentation_weights.best', 'v11_augmentation_weights.best', 'v12_augmentation_weights.best',
'v13_augmentation_weights.best', 'v14_augmentation_weights.best', 'v15_augmentation_weights.best', 'v16_augmentation_weights.best',
'v17_augmentation_weights.best', 'v18_augmentation_weights.best', 'v19_augmentation_weights.best', 'v20_augmentation_weights.best']
model_results = pd.DataFrame(columns=['version', 'soft_accuracy_5', 'soft_accuracy_10', 'mae',
'augmented_soft_accuracy_5', 'augmented_soft_accuracy_10', 'augmented_mae',
'adience_exact', 'adience_one_off', 'augmented_adience_exact', 'augmented_adience_one_off'])
# let's do it for all the versions we have (ResNet50)
custom_objects = {'soft_accuracy_5': soft_accuracy_5, 'soft_accuracy_10': soft_accuracy_10, 'tilted_loss': tilted_loss}
for version in range(1, 21):
print('processing version #{}'.format(version))
i = version - 1 # to use for indexing
base_model = load_model(features_folder + base_model_files[i] + '.hdf5', custom_objects=custom_objects)
augmented_model = load_model(features_folder + augmented_model_files[i] + '.hdf5', custom_objects=custom_objects)
base_metrics = base_model.evaluate(x=test_data_features, y=test_targets)
augmented_metrics = augmented_model.evaluate(x=test_data_features, y=test_targets)
# the indexes in the metrics array are the same, so just get them immediately
base_mae = base_metrics[0]
base_soft_accuracy_10 = base_metrics[1]
base_soft_accuracy_5 = base_metrics[2]
augmented_mae = augmented_metrics[0]
augmented_soft_accuracy_10 = augmented_metrics[1]
augmented_soft_accuracy_5 = augmented_metrics[2]
if version == 6: # we use a different version of loss in this model, so the indexes are different
base_mae = base_metrics[1]
base_soft_accuracy_10 = base_metrics[2]
base_soft_accuracy_5 = base_metrics[3]
augmented_mae = augmented_metrics[1]
augmented_soft_accuracy_10 = augmented_metrics[2]
augmented_soft_accuracy_5 = augmented_metrics[3]
adience_exact, adience_one_off = evaluate_against_adience(base_model)
augmented_adience_exact, augmented_adience_one_off = evaluate_against_adience(augmented_model)
model_results.loc[i] = [version, base_soft_accuracy_5, base_soft_accuracy_10, base_mae,
augmented_soft_accuracy_5, augmented_soft_accuracy_10, augmented_mae,
adience_exact, adience_one_off, augmented_adience_exact, augmented_adience_one_off]
model_results
model_results['version'] = model_results['version'].astype(int)
model_results.to_csv('model_results.csv')
def load_model_results():
return pd.read_csv('model_results.csv')
model_results[['version', 'soft_accuracy_5', 'augmented_soft_accuracy_5', 'soft_accuracy_10' , 'augmented_soft_accuracy_10']].plot.bar(x='version', figsize=(17,5))
legend = plt.legend(['Base Data Accuracy within 5 y', 'Augmeted Data Accuracy within 5 y', 'Base Data Accuracy within 10 y', 'Augmeted Data Accuracy within 10 y'],
loc=(1.01,0))
plt.title('Accuracy of the different model versions')
plt.xlabel('Model Version')
plt.ylabel('Accuracy')
plt.ylim(bottom=.3)
plt.savefig('figures/soft_accuracy_model_version.png', bbox_extra_artists=(legend,), bbox_inches='tight')
None
model_results[['version', 'mae', 'augmented_mae']].plot.bar(x='version', figsize=(17,5))
legend = plt.legend(['Base Data MAE', 'Augmented Data MAE'], loc=(1.01,0))
plt.title('Mean Absolute Error of the different model versions')
plt.xlabel('Model Version')
plt.ylabel('Mean Absolute Error')
plt.ylim(top=15)
plt.savefig('figures/mae_model_versions.png', bbox_extra_artists=(legend,), bbox_inches='tight')
None
model_results[['version', 'adience_exact', 'augmented_adience_exact', 'adience_one_off', 'augmented_adience_one_off']].plot.bar(x='version', figsize=(17,5))
legend = plt.legend(['Base Data Exact Category', 'Augmented Data Exact Category', 'Base Data One-Off Category', 'Augmented Data One-Off Category'], loc=(1.01,0))
plt.title('Category Accuracy Against Adience Data')
plt.xlabel('Model Version')
plt.ylabel('Accuracy')
plt.ylim(bottom=.3)
plt.savefig('figures/adience_benchmark_model_versions.png', bbox_extra_artists=(legend,), bbox_inches='tight')
The model selected is version 19.
def load_best_model():
# the best model we have is version 15 with augmentation, let us load that model
return load_model(features_folder + 'v19_augmentation_weights.best.hdf5', custom_objects={
'soft_accuracy_5': soft_accuracy_5, 'soft_accuracy_10': soft_accuracy_10})
best_model = load_best_model()
y_pred = best_model.predict(test_data_features)
y_pred = np.squeeze(y_pred)
evaluate_model(best_model)
updated_test_data = test_data.copy()
updated_test_data['predicted_age'] = pd.Series(y_pred)
# let us see some of the faces that failed us from the UKTFace dataset
over_30_diff = updated_test_data[abs(updated_test_data['Age'] - updated_test_data['predicted_age']) > 30]
show_sample_ukt_images(over_30_diff, show_pred=True)
# Let us see the images that failed us from the Adience dataset
# Many of these are misclassified due to having more than one photo in the page,
# wearing funny sunglasses, weird hats, or face painting.
# Also some of the photos are too blurry.
age_predictions = best_model.predict(adience_features)
adience_predictions_categories = list(map(lambda p: to_category(p[0]), age_predictions))
adience_data['predicted_cat'] = pd.Series(adience_predictions_categories)
show_sample_adience_images(adience_data[abs(adience_data['predicted_cat'] - adience_data['age_cat']) > 3], show_pred=True, save_file_name='figures/Misclassified_Adience.png')
def accuracy_within(test_data, x, save_file_name=None):
within_x = test_data[abs(test_data['Age'] - test_data['predicted_age']) <= x]
over_x = test_data[abs(test_data['Age'] - test_data['predicted_age']) > x]
bins = np.linspace(0, 116, 100)
fig, ax = plt.subplots(figsize=(15,5))
plt.hist(within_x['Age'], bins, alpha=0.5, label='Accurate within {} years'.format(x))
plt.hist(over_x['Age'], bins, alpha=0.5, label='Over {} year difference'.format(x))
plt.legend(loc='upper right')
plt.title('Frequency of Accurate estimation to {} Years Across Ages'.format(x))
plt.xlabel('Age')
plt.ylabel('Frequency')
if save_file_name:
plt.savefig(save_file_name)
plt.show()
accuracy_within(updated_test_data, 5, 'figures/accuracy_within_5_years.png')
accuracy_within(updated_test_data, 10, 'figures/accuracy_within_10_years.png')
accuracy_within(updated_test_data, 20, 'figures/accuracy_within_20_years.png')
def show_absolute_error_by_race_and_gender(test_data, save_file_name=None):
# the chart looks better with proper names
def add_label_data_for_chart(data):
def convert_race(r):
if r == 0:
return 'White'
if r == 1:
return 'Black'
if r == 2:
return 'Asian'
if r == 3:
return 'Indian'
if r == 4:
return 'Other'
data['Gender_Label'] = data['Gender'].apply(lambda g: 'Male' if g == 0 else 'Female')
data['Race_Label'] = data['Race'].apply(lambda r: convert_race(r))
return data
# we don't want to modify the actual dataset
test_data = test_data.copy()
test_data['error'] = abs(test_data['Age'] - test_data['predicted_age'])
# add descriptive labels
add_label_data_for_chart(test_data)
# let's draw
figure,ax=plt.subplots(figsize=(20,5))
figure.suptitle('Absolute Error by Race and Gender', fontsize=16)
sns.barplot(x='Race_Label', hue='Gender_Label', y='error', data=test_data, ax=ax, order=['White', 'Black', 'Asian', 'Indian', 'Other']);
ax.set_title('Gender Distribution')
ax.set_xlabel('Race')
ax.set_ylabel('Absolute Error')
if save_file_name:
plt.savefig(save_file_name)
plt.close(2)
show_absolute_error_by_race_and_gender(updated_test_data, save_file_name='figures/absolute_error_by_race_and_gender.png')